//! HTTP Task Manager API implementation
//!
//! This file contains the implementation of the HTTP Task manager defined in `wit/component.wit`.
//!
//! The top level `world` in the [WIT][wit] for this WebAssembly component looks like the following:
//! ```
//! package wasmcloud:task-manager@0.1.0-draft;
//!
//! world http-task-manager {
//!   import wasmcloud:postgres/query@0.1.1-draft;
//!
//!   export tasks;
//!   export wasi:http/incoming-handler@0.2.1;
//! }
//! ```
//!
//! This WebAssembly component:
//!
//! - Uses Postgres for persistence (i.e. the [wasmcloud SQLDB Postgres provider][wasmcloud-postgres-provider])
//! - Exports interfaces for task management
//! - Exports an wasi:http incoming handler so it can be used as a Web server (i.e. by the [wasmCloud HTTP server provider][wasmcloud-http-server-provider])
//!
//! [wit]: <https://github.com/WebAssembly/component-model/blob/main/design/mvp/WIT.md>
//! [wasmcloud-postgres-provider]: <https://github.com/wasmCloud/wasmCloud/tree/main/crates/provider-sqldb-postgres>
//! [wasmcloud-http-server-provider]: <https://github.com/wasmCloud/wasmCloud/tree/main/crates/provider-http-server>

pub(crate) mod bindings {
    //! These bindings are generated by wit-bindgen, and reused by other parts of the crate

    use crate::HttpTaskManager;

    wit_bindgen::generate!({
        generate_all,
    });
    export!(HttpTaskManager);
}

use std::collections::HashMap;
use std::str::FromStr;

use anyhow::{bail, Context as _, Result};
use serde_json::json;

// NOTE: the exports below were generated by wit-bindgen
use bindings::exports::wasi::http::incoming_handler;
use bindings::exports::wasmcloud::task_manager::tasks;
use bindings::wasi::http::types::{
    Fields, IncomingRequest, Method, OutgoingBody, OutgoingResponse, ResponseOutparam,
};
use bindings::wasi::logging::logging::{log, Level};
use bindings::wasmcloud::postgres::query::query;
use bindings::wasmcloud::postgres::types::PgValue;
use bindings::wasmcloud::task_manager::types::{
    GroupId, JsonString, Task, TaskId, TaskStatus, WorkerId,
};

use crate::tasks::{
    GetTaskError, GetTasksError, GetTasksQueryOptions, LeaseId, OffsetPagination, SubmitTaskError,
    UpdateTaskError,
};
// END wit-bindgen generated imports

mod db;
use db::extract_nonnull_uuid_column;
use db::migrations::perform_migrations;

mod http;
use http::{
    send_response_json, try_send_error, TaskCompleteRequestBody, TaskFailedRequestBody,
    TaskLeaseRequestBody, TaskReleaseRequestBody, TaskSubmitRequestBody,
};

mod serde;

const LOG_CONTEXT: &str = "http-task-manager";

const DEFAULT_PAGINATION_PAGE_SIZE: u32 = 20;
const DEFAULT_PAGINATION_OFFSET: u32 = 0;

/// All implementation of the `component` world hangs off of this struct
struct HttpTaskManager;

// Implementation of wasi:http/incoming-handler
impl incoming_handler::Guest for HttpTaskManager {
    fn handle(request: IncomingRequest, response_out: ResponseOutparam) {
        // Decode method, path and query params
        let method = request.method();
        let path_with_query = request.path_with_query();
        let (path, query) = match path_with_query.as_deref().unwrap_or("/").split_once('?') {
            Some((path, query)) => (path, query),
            None => (path_with_query.as_deref().unwrap_or("/"), ""),
        };
        let query_params = query
            .split('&')
            .filter_map(|v| v.split_once('='))
            .collect::<HashMap<&str, &str>>();

        let is_json_req = request.is_json();
        let req_body = match request.read_body() {
            Ok(rb) => rb,
            Err(e) => {
                try_send_error!(
                    LOG_CONTEXT,
                    500,
                    "invalid-body",
                    &format!("failed to receive incoming body: {e}"),
                    response_out
                );
                return;
            }
        };

        // Route the HTTP requests
        match (method, path) {
            // GET /ready
            //
            // Simple endpoint to indicate this component's information/when it's ready to be used
            // this is another thing.
            (Method::Get, "/ready") => {
                send_response_json(response_out, json!({"status": "success" }), 200);
            }

            // POST /admin/v1/db/migrate
            //
            // This endpoint performs a DB migration
            (Method::Post, "/admin/v1/db/migrate") => {
                match perform_migrations() {
                    Ok(()) => send_response_json(response_out, json!({ "status": "success" }), 200),
                    Err(e) => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "db-migration-failure",
                            &format!("failed to perform migrations: {e}"),
                            response_out
                        );
                    }
                };
            }

            // GET /api/v1/tasks
            //
            // Retrieve tasks in a database:
            // - all tasks
            // - tasks of a given group (by specifying query param `group_id`)
            (Method::Get, "/api/v1/tasks") => {
                let offset = query_params
                    .get("offset")
                    .and_then(|v| v.parse::<u32>().ok())
                    .unwrap_or(DEFAULT_PAGINATION_OFFSET);
                let page_size = query_params
                    .get("page_size")
                    .and_then(|v| v.parse::<u32>().ok())
                    .unwrap_or(DEFAULT_PAGINATION_PAGE_SIZE);
                let group_id = query_params.get("group_id").map(|v| String::from(*v));

                let tasks = match <Self as tasks::Guest>::get_tasks(GetTasksQueryOptions {
                    group_id,
                    pagination: OffsetPagination { offset, page_size },
                }) {
                    Ok(ts) => ts,
                    Err(e) => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "task-retrieval-failure",
                            &format!("failed to retrieve tasks: {e}"),
                            response_out
                        );
                        return;
                    }
                };

                send_response_json(
                    response_out,
                    json!({
                        "status": "success",
                        "data": tasks,
                    }),
                    200,
                );
            }

            // POST /api/v1/tasks
            //
            // Submit a single new task
            (Method::Post, "/api/v1/tasks") => {
                if !is_json_req {
                    try_send_error!(
                        LOG_CONTEXT,
                        400,
                        "invalid-content-type",
                        "Invalid content type, must be application/json",
                        response_out
                    );
                    return;
                }

                let TaskSubmitRequestBody {
                    group_id,
                    task_data,
                } = match serde_json::from_slice(&req_body) {
                    Ok(rb) => rb,
                    Err(e) => {
                        try_send_error!(
                            LOG_CONTEXT,
                            400,
                            "invalid-req-body",
                            &format!("failed to parse task submit data from request body: {e}"),
                            response_out
                        );
                        return;
                    }
                };
                if !task_data.is_object() {
                    try_send_error!(
                        LOG_CONTEXT,
                        400,
                        "invalid-req-body",
                        "task data must be a JSON object",
                        response_out
                    );
                    return;
                }

                match <Self as tasks::Guest>::submit_task(group_id, task_data.to_string()) {
                    Ok(task) => {
                        send_response_json(
                            response_out,
                            json!({
                                "status": "success",
                                "data": task,
                            }),
                            200,
                        );
                    }
                    Err(e) => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "task-submission-failed",
                            &format!("task submission failed: {e}"),
                            response_out
                        );
                    }
                }
            }

            // POST /api/v1/tasks/:task_id/complete
            //
            // Mark a single task as completed
            (Method::Post, path)
                if path.starts_with("/api/v1/tasks/") && path.ends_with("/complete") =>
            {
                let task_id = match path.split("/").collect::<Vec<&str>>()[..] {
                    [_, _, _, _, tid, _] => tid,
                    // Unexpected
                    _ => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "invalid-url",
                            "unexpected URL pattern, missing task_id segment",
                            response_out
                        );
                        return;
                    }
                };

                let TaskCompleteRequestBody { worker_id } = match serde_json::from_slice(&req_body)
                {
                    Ok(rb) => rb,
                    Err(e) => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "invalid-req-body",
                            &format!("failed to parse task complete data from request body: {e}"),
                            response_out
                        );
                        return;
                    }
                };

                match <Self as tasks::Guest>::mark_task_completed(String::from(task_id), worker_id)
                {
                    Ok(()) => {
                        send_response_json(response_out, json!({ "status": "success" }), 200);
                    }
                    Err(e) => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "task-completion-err",
                            &format!("marking task completed failed: {e}"),
                            response_out
                        );
                    }
                }
            }

            // POST /api/v1/tasks/:task_id/failed
            //
            // Mark a single task as failed
            (Method::Post, path)
                if path.starts_with("/api/v1/tasks/") && path.ends_with("/fail") =>
            {
                let task_id = match path.split("/").collect::<Vec<&str>>()[..] {
                    [_, _, _, _, tid, _] => tid,
                    // Unexpected
                    _ => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "invalid-url",
                            "unexpected URL pattern, missing task_id segment",
                            response_out
                        );
                        return;
                    }
                };

                let TaskFailedRequestBody { worker_id, reason } =
                    match serde_json::from_slice(&req_body) {
                        Ok(rb) => rb,
                        Err(e) => {
                            try_send_error!(
                                LOG_CONTEXT,
                                500,
                                "invalid-req-body",
                                &format!(
                                    "failed to parse task failure data from request body: {e}"
                                ),
                                response_out
                            );
                            return;
                        }
                    };

                match <Self as tasks::Guest>::mark_task_failed(
                    String::from(task_id),
                    worker_id,
                    reason,
                ) {
                    Ok(()) => {
                        send_response_json(response_out, json!({"status": "success" }), 200);
                    }
                    Err(e) => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "task-failure-err",
                            &format!("marking task failed failed: {e}"),
                            response_out
                        );
                    }
                }
            }

            // POST /api/v1/tasks/:task_id/lease
            //
            // Lease a single task for work
            (Method::Post, path)
                if path.starts_with("/api/v1/tasks/") && path.ends_with("/lease") =>
            {
                let task_id = match path.split("/").collect::<Vec<&str>>()[..] {
                    [_, _, _, _, tid, _] => tid,
                    // Unexpected
                    _ => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "invalid-url",
                            "unexpected URL pattern: missing task_id segment",
                            response_out
                        );
                        return;
                    }
                };

                let TaskLeaseRequestBody { worker_id } = match serde_json::from_slice(&req_body) {
                    Ok(rb) => rb,
                    Err(e) => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "invalid-req-body",
                            &format!("failed to parse task lease data from request body: {e}"),
                            response_out
                        );
                        return;
                    }
                };

                match <Self as tasks::Guest>::lease_task(String::from(task_id), worker_id) {
                    Ok(lease_id) => {
                        send_response_json(
                            response_out,
                            json!({
                                "status": "success",
                                "data": lease_id,
                            }),
                            200,
                        );
                    }
                    Err(e) => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "task-lease-err",
                            &format!("leasing task failed: {e}"),
                            response_out
                        );
                    }
                }
            }

            // POST /api/v1/tasks/:task_id/release
            //
            // Release a single task from being worked on
            (Method::Post, path)
                if path.starts_with("/api/v1/tasks/") && path.ends_with("/release") =>
            {
                let task_id = match path.split("/").collect::<Vec<&str>>()[..] {
                    [_, _, _, _, tid, _] => tid,
                    // Unexpected
                    _ => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "invalid-url",
                            "unexpected URL pattern: missing task_id segment",
                            response_out
                        );
                        return;
                    }
                };

                let TaskReleaseRequestBody {
                    worker_id,
                    lease_id,
                } = match serde_json::from_slice(&req_body) {
                    Ok(rb) => rb,
                    Err(e) => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "invalid-req-body",
                            &format!("failed to parse task lease data from request body: {e}"),
                            response_out
                        );
                        return;
                    }
                };

                match <Self as tasks::Guest>::release_task(
                    String::from(task_id),
                    worker_id,
                    lease_id,
                ) {
                    Ok(()) => {
                        send_response_json(response_out, json!({ "status": "success" }), 200);
                    }
                    Err(e) => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "task-release-err",
                            &format!("releasing task failed: {e}"),
                            response_out
                        );
                    }
                }
            }

            // GET /api/v1/tasks/:task_id
            //
            // Retrieve a single task from the database
            //
            // NOTE: beware of match precedence, this route has to come last
            (Method::Get, path) if path.starts_with("/api/v1/tasks/") => {
                let task_id = match path.split("/").collect::<Vec<&str>>()[..] {
                    [_, _, _, _, tid] => tid,
                    // Unexpected
                    _ => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "invalid-url",
                            "unexpected URL pattern: missing task_id segment",
                            response_out
                        );
                        return;
                    }
                };

                let task = match <Self as tasks::Guest>::get_task(String::from(task_id)) {
                    Ok(t) => t,
                    Err(e) => {
                        try_send_error!(
                            LOG_CONTEXT,
                            500,
                            "task-submit-err",
                            &format!("task submission failed: {e}"),
                            response_out
                        );
                        return;
                    }
                };

                send_response_json(
                    response_out,
                    json!({
                        "status": "success",
                        "data": &task,
                    }),
                    200,
                );
            }

            // For all other requests, send a 404 with a JSON body that indicates the invalid endpoint
            _ => send_response_json(
                response_out,
                json!({
                    "status": "error",
                    "error": {
                        "code": "invalid-endpoint",
                        "message": format!("no such endpoint [{path}]"),
                    }
                }),
                404,
            ),
        }
    }
}

// Implementation of our home-grown tasks interface
impl tasks::Guest for HttpTaskManager {
    fn get_tasks(
        GetTasksQueryOptions {
            group_id,
            pagination,
        }: GetTasksQueryOptions,
    ) -> Result<Vec<Task>, GetTasksError> {
        let rows = match group_id {
            // Query for all tasks with the given Group ID
            Some(group_id) => {
                match query(
                    db::queries::GET_TASKS_BY_GROUP_ID,
                    &[
                        PgValue::Text(group_id),
                        PgValue::BigInt(i64::from(pagination.offset)),
                        PgValue::BigInt(i64::from(pagination.page_size)),
                    ],
                ) {
                    Ok(rows) => rows,
                    Err(e) => {
                        log(
                            Level::Error,
                            LOG_CONTEXT,
                            &format!("error querying tasks: {e}"),
                        );
                        return Err(GetTasksError::Unexpected(format!(
                            "failed to query tasks: {e}"
                        )));
                    }
                }
            }

            // Query for all tasks
            None => {
                match query(
                    db::queries::GET_TASKS,
                    &[
                        PgValue::BigInt(i64::from(pagination.offset)),
                        PgValue::BigInt(i64::from(pagination.page_size)),
                    ],
                ) {
                    Ok(rows) => rows,
                    Err(e) => {
                        log(
                            Level::Error,
                            LOG_CONTEXT,
                            &format!("error querying tasks: {e}"),
                        );
                        return Err(GetTasksError::Unexpected(format!(
                            "failed to query tasks: {e}"
                        )));
                    }
                }
            }
        };

        // Convert all DB rows to tasks
        let tasks = rows
            .iter()
            .map(|r| {
                Task::try_from(r)
                    .map_err(|e| GetTasksError::Unexpected(format!("failed to convert task: {e}")))
            })
            .collect::<Result<Vec<Task>, GetTasksError>>()?;

        Ok(tasks)
    }

    fn get_task(task_id: TaskId) -> Result<Task, GetTaskError> {
        // Query for a single task
        let rows = match query(db::queries::GET_TASK_BY_ID, &[PgValue::Uuid(task_id)]) {
            Ok(rows) => rows,
            Err(e) => {
                log(
                    Level::Error,
                    LOG_CONTEXT,
                    &format!("error querying task by ID: {e}"),
                );
                return Err(GetTaskError::Unexpected(
                    "failed to query task by ID".into(),
                ));
            }
        };

        // Ensure that there is only one task with
        match &rows[..] {
            [r] => Task::try_from(r)
                .map_err(|e| GetTaskError::Unexpected(format!("failed to map task: {e}"))),
            _ => Err(GetTaskError::Unexpected(
                "unexpected number of tasks (expected only one with given task ID and group ID)"
                    .into(),
            )),
        }
    }

    fn submit_task(group_id: GroupId, task_data: JsonString) -> Result<Task, SubmitTaskError> {
        // Ensure the task is valid JSON and an object
        match serde_json::from_slice::<serde_json::Value>(task_data.as_bytes()) {
            Ok(obj) => {
                if !obj.is_object() {
                    return Err(SubmitTaskError::InvalidTaskData(
                        "task data must be a JSON object".into(),
                    ));
                }
            }
            Err(e) => return Err(SubmitTaskError::InvalidTaskData(e.to_string())),
        }

        // Insert a new task
        let rows = match query(
            db::queries::INSERT_TASK,
            &[PgValue::Text(group_id), PgValue::Jsonb(task_data)],
        )
        .map_err(|e| SubmitTaskError::Unexpected(format!("failed to insert task: {e}")))
        {
            Ok(rows) => rows,
            Err(e) => {
                log(
                    Level::Error,
                    LOG_CONTEXT,
                    &format!("error inserting task: {e}"),
                );
                return Err(SubmitTaskError::Unexpected("failed to insert task".into()));
            }
        };

        let row = rows.first().ok_or_else(|| {
            SubmitTaskError::Unexpected("no rows returned from task creation".into())
        })?;

        Task::try_from(row)
            .map_err(|e| SubmitTaskError::Unexpected(format!("failed to build task from row: {e}")))
    }

    fn mark_task_completed(task_id: TaskId, worker_id: WorkerId) -> Result<(), UpdateTaskError> {
        // Mark the task complete
        let rows = query(
            db::queries::MARK_TASK_COMPLETE,
            &[PgValue::Uuid(task_id), PgValue::Text(worker_id)],
        )
        .map_err(|e| {
            UpdateTaskError::Unexpected(format!("failed to mark task completed task: {e}"))
        })?;

        // Ensure that only one row was returned (i.e. one task was modified)
        match &rows[..] {
            [_r] => Ok(()),
            _ => Err(UpdateTaskError::Unexpected(
                "unexpected number of tasks while marking completion (expected only one)".into(),
            )),
        }
    }

    fn mark_task_failed(
        task_id: TaskId,
        worker_id: WorkerId,
        reason: String,
    ) -> Result<(), UpdateTaskError> {
        // Mark the task failed
        let rows = query(
            db::queries::MARK_TASK_FAILED,
            &[
                PgValue::Uuid(task_id),
                PgValue::Text(worker_id),
                PgValue::Text(reason),
            ],
        )
        .map_err(|e| UpdateTaskError::Unexpected(format!("failed to mark task failed: {e}")))?;

        // Ensure that only one row was returned (i.e. one task was modified)
        match &rows[..] {
            [_r] => Ok(()),
            _ => Err(UpdateTaskError::Unexpected(
                "unexpected number of tasks while marking failure (expected only one)".into(),
            )),
        }
    }

    fn lease_task(task_id: TaskId, worker_id: WorkerId) -> Result<LeaseId, UpdateTaskError> {
        // Lease the task
        let rows = query(
            db::queries::LEASE_TASK,
            &[PgValue::Uuid(task_id), PgValue::Text(worker_id)],
        )
        .map_err(|e| UpdateTaskError::Unexpected(format!("failed to lease task: {e}")))?;

        // Ensure that only one row was returned (i.e. one task was modified)
        match &rows[..] {
            [r] => extract_nonnull_uuid_column(r, "lease_id")
                .map(ToString::to_string)
                .context("failed to find lease_id column for leased Task")
                .map_err(|e| UpdateTaskError::Unexpected(format!("failed to lease task: {e}"))),
            [] => Err(UpdateTaskError::Unexpected(
                "lease failed, no tasks updated".into(),
            )),
            _ => Err(UpdateTaskError::Unexpected(
                "unexpected number of tasks while leasing task (expected only one)".into(),
            )),
        }
    }

    fn release_task(
        task_id: TaskId,
        worker_id: WorkerId,
        lease_id: LeaseId,
    ) -> Result<(), UpdateTaskError> {
        // Release the task for other workers
        let rows = query(
            db::queries::RELEASE_TASK,
            &[
                PgValue::Uuid(task_id),
                PgValue::Uuid(lease_id),
                PgValue::Text(worker_id),
            ],
        )
        .map_err(|e| UpdateTaskError::Unexpected(format!("failed to release task: {e}")))?;

        // Ensure that only one row was returned (i.e. one task was modified)
        match &rows[..] {
            [_r] => Ok(()),
            _ => Err(UpdateTaskError::Unexpected(
                "unexpected number of tasks while leasing task (expected only one)".into(),
            )),
        }
    }
}

impl From<anyhow::Error> for GetTaskError {
    fn from(err: anyhow::Error) -> GetTaskError {
        GetTaskError::Unexpected(err.to_string())
    }
}

impl FromStr for TaskStatus {
    type Err = anyhow::Error;

    fn from_str(s: &str) -> Result<Self> {
        match s {
            "pending" => Ok(Self::Pending),
            "leased" => Ok(Self::Leased),
            "completed" => Ok(Self::Completed),
            "failed" => Ok(Self::Failed),
            _ => bail!("invalid TaskStatus value [{s}]"),
        }
    }
}
